Before you turn this problem in, make sure everything runs as expected. First, restart the kernel (in the menubar, select Kernel$\rightarrow$Restart) and then run all cells (in the menubar, select Cell$\rightarrow$Run All).

Make sure you fill in any place that says YOUR CODE HERE or "YOUR ANSWER HERE", as well as your name and collaborators below:

In [1]:
NAME = "Grupo HDFS 22"
COLLABORATORS = "Esteban Braganza y Borja López"

Logotip de Spark

Actividad Cloud con AWS

Esta actividad consiste en que implementéis un pequeño proyecto de análisis de datos utilizando algunas de las herramientas que nos ofrece Amazon Web Services (AWS). Se trata, pues, de una implementación libre de un proyecto en la nube de AWS.

Para hacerlo, utilizaremos una serie de recursos a los que os hemos dado acceso en la plataforma educativa AWS Academy, que son prácticamente los mismos que tenemos en la versión comercial/empresarial, pero sin que tengáis que preocuparos por daros de alta ni por los costos de su uso. A la hora de implementar el proyecto, os recomendamos maximizar el uso de los servicios PaaS y minimizar el uso de servicios IaaS. Por ejemplo, podéis utilizar los servicios S3, Lambda, Kinesis, EMR, EC2, Glue, Athena o Redshift, pero sois libres de utilizar los servicios adecuados para la recolección, almacenamiento, procesamiento y análisis de datos en función de vuestro problema. Podréis encontrar la lista de servicios disponibles dentro de la plataforma AWS Academy.

En cuanto a la evaluación, esta se realizará a partir de las rúbricas que encontraréis en este documento para cada uno de los apartados.

AWS Academy, https://www.awsacademy.com/

Antes de empezar a desarrollar el proyecto, es muy recomendable realizar el curso AWS Academy Cloud Foundations al que os hemos dado acceso en la plataforma AWS Academy. En especial, os recomendamos hacer los apartados de prácticas de laboratorio que hay, donde se aprende a trabajar con EC2, S3, Lambda, etc.

Para acceder al curso, debéis iniciar sesión en la plataforma de AWS Academy en https://www.awsacademy.com/ con vuestro correo de UOC.

Veréis que tenéis acceso al curso AWS Cloud Foundations y a un laboratorio (Learner Lab) para hacer las prácticas. Tened en cuenta que la mayoría de los espacios de AWS Academy tienen una serie de restricciones (en especial, número de créditos y tiempo disponible) y, por tanto, debéis ser muy cuidadosos al guardar y documentar cualquier configuración que hagáis en el entorno de AWS Academy. Si superáis el presupuesto del laboratorio, se desactivará la cuenta del laboratorio y se perderá todo el progreso y los recursos.

El curso es bastante extenso, pero debéis leer y hacer las prácticas que hay en los siguientes módulos:

  • Módulo 1 - Información general sobre los conceptos de la nube
  • Módulo 5 - Redes y entrega de contenido
  • Módulo 6 - Informática
  • Módulo 7 - Almacenamiento
  • Módulo 8 - Bases de datos

image.png

image.png

image.png

En el caso del Learner Lab, image.png image-2.png

Puntuación de la actividad:

  • Ejercicio 1: Descripción y justificación de los servicios de AWS (2,5 puntos)
  • Ejercicio 2: Pruebas de procesamiento de datos (2,5 puntos)
  • Ejercicio 3: Validación de los KPIs (2,5 puntos)
  • Ejercicio 4: Incorporación de scripts o código relevante (2,5 puntos)

Ejercicio 1: Descripción y justificación de los servicios de AWS (2,5 puntos)

En este apartado, explicad y justificad los servicios de AWS que habéis seleccionado en vuestro proyecto, teniendo en cuenta los objetivos del proyecto y el tipo de datos con los que trabajáis. Describid detalladamente cómo se realizará el procesamiento de los datos en AWS. Incluid una explicación de los flujos de trabajo, las transformaciones de datos y cómo se gestionará la integración entre los diferentes servicios de AWS. Aseguraos de que la selección de los servicios se alinea con las necesidades específicas. Si es posible, se pide utilizar los iconos estándar de AWS para mostrar de forma visual vuestra propuesta de tratamiento de datos.

Rúbrica de Evaluación para la Descripción y Justificación de los Servicios de AWS, cada apartado 0,5 puntos

Categoría Excelente (4) Bueno (3) Satisfactorio (2) Necesita Mejora (1)
Claridad y Precisión La descripción y justificación de los servicios es muy clara y precisa. Toda la información es fácilmente comprensible. La descripción es clara y precisa, con pocos errores de comprensión. La descripción es generalmente clara, pero puede tener algunos errores u omisiones menores. La descripción es confusa y contiene muchos errores que dificultan la comprensión.
Justificación de los Servicios de AWS Seleccionados La justificación de los servicios seleccionados está bien argumentada y alineada con los objetivos del proyecto y el tipo de datos. La justificación está bien argumentada, pero puede faltar alguna alineación clara con los objetivos o datos. La justificación es básica y hay falta de alineación clara con los objetivos o datos. La justificación no está argumentada ni alineada con los objetivos del proyecto ni el tipo de datos.
Descripción del Procesamiento de Datos La descripción del procesamiento de los datos es detallada y cubre todos los aspectos importantes, incluidos los flujos de trabajo y las transformaciones. La descripción es detallada pero puede faltar alguna información específica. La descripción cubre los aspectos básicos pero faltan detalles importantes. La descripción es inadecuada y no cubre los aspectos importantes del procesamiento de datos.
Integración entre Servicios de AWS La integración entre los servicios está bien explicada y es coherente. Todos los servicios se integran de manera eficiente. La integración está bien explicada, pero podría mejorar en coherencia. La integración está descrita de manera básica pero faltan detalles y coherencia. La integración no está bien explicada y los servicios no se integran de manera efectiva.
Presentación Visual con Iconos de AWS Los iconos estándar de AWS están bien utilizados y la propuesta es visualmente clara y bien estructurada. Los iconos están utilizados pero la claridad visual puede mejorar. Los iconos están poco utilizados o mal colocados, lo que reduce la claridad visual. Los iconos no están utilizados y la propuesta es confusa visualmente.

Adjunta la explicación aquí. Las explicaciones son capturas de pantalla, de lo que demostraréis en la entrevista de la actividad.

Ejercicio 1: Descripción y Justificación de los Servicios de AWS

En el proyecto presentado, hemos diseñado una plataforma de análisis de datos de videojuegos utilizando servicios AWS. El objetivo es llegar a analizar los datos, capturando, alamcenando, procesanso y analizando en timepo real los datos.

El esquema utilizado es el siguiente:

Actividad%203%20%282%29.png

A continuación, detallamos los servicios de AWS seleccionados y justificamos su uso:

Amazon Kinesis Data Streams:

Nos permite capturar los datos de los juegos que obtenemos a través de aplicaciones móviles, servidores de juegos y navegadores web en tiempo real. Amazon Kinesis Data Stream es lo que necesitamos, ya que nos permite alamcenar y manejar el gran numero de eventos de los juegos que obtenemos a través de las fuentes.

Amazon Kinesis Data Firehose:

Se encarga de mover los datos de Kinesis Data Streams a Amazon S3 en tiempo real, para su almacenamiento en bruto. Mediante este paso intermedio, simplificamos la entrega de datos hacía el alamcenamiento duradero, como es "Amazon simple storage Services", es decir Amaxon S3.

Amazon Simple Storage Service (S3)

Descripción: Utilizamos S3 como el almacenamiento central del proyecto en tres capas:

  • Bronze Layer: Almacena datos en bruto directamente desde Kinesis Firehose.
  • Silver Layer: Contiene datos procesados y estructurados por evento, y será el encargado de guardar los datos, listos para el análisis.

S3 ofrece un almacenamiento económico, escalable y con alta durabilidad para nuestros datos, además la separación en capas permite un mayor control y gestión eficientes del ciclo de vida de los datos.

AWS Glue

Se utilizan dos tipos de AWS Glue:

  • Glue Jobs: se encarga de llevar a cabo la extracción, transformación y carga de los datos de forma automatica de la primera capa a la segunda, en nuestro caso la capa final. Además permite realizar trasnformaciones, como la limpieza, normalización y cálculos de agregación.

  • Glue Crawlers: se encarga actaulizar el catalogo de datos a través de la detección automatica de los esquemas de los datos almacenados en S3, lo cual, nos permite simplificar la gestión de los datos.

Amazon Athena

Permite ejecutar consultas SQL directamente de los datos almacenados en S3 y elimina la necesidad de mover datos a una base de datps tradicional.

Amazon QuickSight

Permite la creación de dashboards para facilitar la visualización de datos procesados.


Flujos de Trabajo y Procesamiento de Datos

  1. Se generan los datos por los jugadores a tarvés de aplicaciones móviles, servidores de juegos y navegadores web.
  2. Los eventos que se han producido se envían a Amazon Kinesis Data Streams, que se encarga de captura y procesar los datos.
  3. Kinesis Data Firehose transfiere los datos capturados hacia Amazon S3, donde se almacenan en la primera capa (Bronze layer).
  4. Glue Jobs se encarga de procesar los datos desde la primera capa hacía la segunda (Silver layer) y en nuestro caso, capa final.
  5. Utilizamos Amazon Athena para ejecutar consultas SQL directamente sobre la capas Silver.
  6. Los datos ya procesados se utilizan para la creación de dashboard mediante la herramientoAmazon QuickSight.

Gestión de la Integración entre Servicios

  • Conexión Kinesis-S3: Kinesis Firehose es el encargado de enviar los datos a Amazon S3 y elimina la necesidad de scripts personalizados.
  • S3-Glue: Se encarga de escanear contanteemente los nuevos datos y de actualizar la base.
  • Glue-Athena: Los catálogos generados por Glue Crawlers son utilizados por Athena para ejecutar consultas SQL.
  • Athena-QuickSight: QuickSight se conecta con Athena para consultar los datos transformados y llevar a cabo la visualización de datos.

Ejercicio 2: Pruebas de procesamiento de datos (2,5 puntos)

Realizad un conjunto de pruebas para aseguraros de que los datos se están procesando correctamente. Describid los pasos seguidos para verificar la precisión y la consistencia del procesamiento de datos. En este apartado debéis utilizar los servicios de persistencia de datos (EBS y/o S3), una máquina EC2, alguna de las bases de Amazon (RDS) y el uso de Lambda. Además, será necesario utilizar algún servicio adicional de los que ofrece Amazon y a los que tenéis acceso en el Learner Lab.

Rúbrica de Evaluación para la Verificación del Procesamiento de Datos, cada apartado vale 0,5 puntos

Ítem Excelente (4) Bueno (3) Satisfactorio (2) Necesita Mejora (1)
Descripción de los Pasos Seguidos La descripción de los pasos seguidos es muy clara, detallada y fácilmente comprensible. La descripción es clara y detallada con pocos errores de comprensión. La descripción es generalmente clara, pero puede tener algunos errores u omisiones menores. La descripción es confusa y contiene muchos errores que dificultan la comprensión.
Precisión y Consistencia de los Datos Las pruebas demuestran una precisión y consistencia excelente del procesamiento de datos con resultados fiables y consistentes. Las pruebas demuestran una precisión y consistencia buenas con pocos errores. Las pruebas demuestran una precisión y consistencia aceptables pero con algunos errores. Las pruebas demuestran una falta de precisión y consistencia significativa en el procesamiento de datos.
Uso de los Servicios de AWS (EBS/S3, EC2, RDS, Lambda) El uso de los servicios de AWS está bien justificado y se utilizan de manera eficiente para el procesamiento de datos. El uso de los servicios está bien justificado, pero puede faltar algo de eficiencia. El uso de los servicios es aceptable pero no totalmente eficiente o justificado. El uso de los servicios es inadecuado o no se justifica correctamente.
Integración entre Servicios de AWS La integración entre los servicios de AWS es excelente, con flujos de trabajo bien definidos. La integración es buena pero podría ser más eficiente. La integración es aceptable pero puede tener algunas deficiencias. La integración es inadecuada y no se completan los flujos de trabajo correctamente.
Presentación Visual con Iconos de AWS Los iconos estándar de AWS están bien utilizados y la propuesta es visualmente clara y bien estructurada. Los iconos están utilizados pero la claridad visual puede mejorar. Los iconos están poco utilizados o mal colocados, lo que reduce la claridad visual. Los iconos no están utilizados y la propuesta es confusa visualmente.

Adjunta la explicación aquí. Las explicaciones son capturas de pantalla, de lo que demostraréis en la entrevista de la actividad.

Las pruebas que hemos llevado a cabo para el procesamiento de datos son las siguiente:

Integridad del Almacenamiento en S3

Para asegurar que los datos llegan correctamente y se guardan y organizan en las capas "Bronze" y "Silver" en S3 llevamos a cabo los siguientes pasos:

  1. Carga de Datos de Prueba: Generamos un conjunto de datos ficticios de eventos de distintos juegos en un archivo JSON mediante un coigo .py.
  2. Prueba con Kinesis: Enviamos los datos generados anteriormente a Kinesis y validamos que se envian correctamente a la primera capa, es de cir, la capa "Bronze", que se encuentra en S3 a travñes de Kinesis Data Firehouse.
  3. Verificación de Almacenamiento: Verificamos que los archivos en la primera capa tienen el formato correcto.
Pruebas de Transformaciones con AWS Glue

Para asegurar que la transformación de los datos llevadas a cabo por Glue Jobs se ejecutan de forma adecuada, realizamos los siguientes pasos:

  1. Configurar un Glue Job para procesar los datos desde la capa Bronze a Silver:
    • Limpiar duplicados.
    • Manejar valores nulos (por ejemplo, reemplazar valores faltantes con un valor predeterminado).
    • Convertir los datos al formato Parquet.
  2. Validar los resultados, y confirmar que las transformaciones se han llevado a cabo correcatemnte.
Pruebas de Consultas con Amazon Athenea

Para asegurar que las consultas SQL realizadas sobre los datos procesados son precisas y retornan los resultados esperados, llevaremos a cabo los siguientes pasos:

  1. Importar los datos procesados desde la capa Silver a la base de datos de Athenea.
  2. Ejecutar alguna consulta SQL comun.
  3. Comparar los resultados de las consultas en Athenea con los datos originales almacenados en S3 para validar la precisión.

Comprobamos el esquema de la tabla de eventos que determinó el Crawler

El crawler de AWS determina el esquema de la tabla automáticamente. Debemos asegurarnos que la estructura Json del mismo incluya todas las posibles combinaciones para que no tenga problemas con nuevos datos. Hasta el momento de la entrega hemos obtenido 110.244 registros en la capa bronze.

CREATE EXTERNAL TABLE `raw_events_actividad_3`(
  `event` STRUCT<
    event_version:STRING,
    event_id:STRING,
    event_type:STRING,
    event_name:STRING,
    event_timestamp:INT,
    app_version:STRING,
    event_data:STRUCT<
      item_id:STRING,
      item_version:INT,
      platform:STRING,
      last_login_time:INT,
      last_screen_seen:STRING,
      level_id:STRING,
      level_version:INT,
      tutorial_screen_id:STRING,
      tutorial_screen_version:INT,
      report_id:STRING,
      report_reason:STRING,
      user_rating:INT,
      match_id:STRING,
      map_id:STRING,
      match_type:STRING,
      user_rank_reached:STRING,
      lootbox_id:STRING,
      lootbox_cost:INT,
      item_rarity:STRING,
      item_cost:INT,
      match_result_type:STRING,
      exp_gained:INT,
      most_used_spell:STRING,
      item_amount:INT,
      currency_type:STRING,
      country_id:STRING,
      currency_amount:INT,
      transaction_id:STRING,
      matched_slots:INT,
      spell_id:STRING,
      matching_failed_msg:STRING
    >
  ) COMMENT 'from deserializer',
  `application_id` STRING COMMENT 'from deserializer'
)
PARTITIONED BY ( 
  `partition_0` STRING, 
  `partition_1` STRING, 
  `partition_2` STRING, 
  `partition_3` STRING
)
ROW FORMAT SERDE 
  'org.openx.data.jsonserde.JsonSerDe' 
WITH SERDEPROPERTIES ( 
  'paths'='application_id,event'
) 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://raw-events-actividad-3/'
TBLPROPERTIES (
  'CrawlerSchemaDeserializerVersion'='1.0', 
  'CrawlerSchemaSerializerVersion'='1.0', 
  'UPDATED_BY_CRAWLER'='crawler-actividad3', 
  'averageRecordSize'='1042', 
  'classification'='json', 
  'compressionType'='none', 
  'objectCount'='39', 
  'partition_filtering.enabled'='true', 
  'recordCount'='184420', 
  'sizeKey'='192945298', 
  'typeOfData'='file'
);

Comprobamos que hayan datos en la capa Bronze

image.png

Comprobamos las tablas de la capa Silver

Vemos que las tablas de los eventos procesados están correctamente guardadas se puede consultar sus datos e incluso conectarlos con Quicksight para su visualización. image.png

Pruebas de visualización de datos con Quicksight

Para segurara que los datos estan llegando correcatmente a las capas finales y que se pueden representar visualmente, llevaremos a cabo un pequeño Dashboard mediante Quicksight.

IMG-20250121-WA0005.jpg

Ejercicio 3: Validación de los KPIs (2,5 puntos)

Para aseguraros de que los KPIs (Key Performance Indicators) definidos están correctamente medidos y reflejan fielmente la realidad del análisis, es necesario seguir un proceso riguroso y meticuloso. A continuación, se describen los pasos y métodos utilizados para verificar la precisión y la relevancia de los KPIs:

1. Definición Clara y Precisa de los KPIs

  • Objetivos Específicos: Aseguraos de que cada KPI esté alineado con los objetivos específicos del proyecto u organización. Por ejemplo, si el objetivo es aumentar la satisfacción del cliente, los KPIs deberían medir aspectos directamente relacionados como el tiempo de respuesta y la calidad del servicio.
  • Medibles y Cuantificables: Los KPIs deben ser fácilmente medibles y cuantificables. Utilizad unidades de medida estándar y aseguraos de que se puedan recopilar datos de manera consistente.

2. Recopilación de Datos Consistente

  • Fuentes de Datos Fiables: Utilizad fuentes de datos fiables y verificadas para recopilar la información necesaria. Los datos deben ser recopilados de manera sistemática y coherente.
  • Métodos de Recopilación: Utilizad métodos de recopilación de datos adecuados, como encuestas, análisis de registros de sistemas y monitores de rendimiento en tiempo real.

3. Validación de la Precisión

  • Auditoría de Datos: Realizad auditorías periódicas para garantizar la precisión de los datos recopilados. Comprobad que los datos no contengan errores ni discrepancias.
  • Comparación con Estándares: Comparad los KPIs con estándares y benchmarks de la industria para aseguraros de que son razonables y realistas.

4. Análisis de la Relevancia

  • Análisis de Correlación: Utilizad análisis estadísticos para determinar si los KPIs están correlacionados con los resultados deseados. Esto ayudará a asegurar que los KPIs sean relevantes y proporcionen información significativa.
  • Feedback Continuo: Recopilad feedback de los stakeholders (partes interesadas) para aseguraros de que los KPIs reflejen sus preocupaciones y necesidades.

5. Revisión y Ajuste Continuos

  • Revisión Periódica: Revisad los KPIs regularmente para aseguraros de que continúen siendo relevantes a medida que evolucionan los objetivos y las circunstancias del proyecto.
  • Ajustes Necesarios: Realizad ajustes en los KPIs cuando sea necesario para reflejar mejor los cambios en el entorno del proyecto u organización.

6. Documentación de los Métodos Utilizados

  • Descripción Detallada: Documentad detalladamente los métodos y procedimientos utilizados para verificar la precisión y relevancia de los KPIs. Esto incluye las técnicas de recopilación de datos, los métodos de auditoría y los análisis estadísticos.
  • Transparencia: Mantened una transparencia total en el proceso para asegurar la confianza de los stakeholders en los KPIs definidos.

Siguiendo estos pasos, podréis validar que los KPIs definidos están correctamente medidos y reflejan fielmente la realidad del análisis, garantizando así que proporcionen información útil y accionable para la toma de decisiones.

Rúbrica de Evaluación para la Validación de los KPIs, cada apartado vale 0,5 puntos

Ítem Excelente (4) Bueno (3) Satisfactorio (2) Necesita Mejora (1)
Definición Clara y Precisa de los KPIs Los KPIs están claramente definidos, alineados con los objetivos del proyecto y son fácilmente medibles. Los KPIs están bien definidos y alineados, pero podría haber mejoras en la medibilidad. Los KPIs están parcialmente definidos y alineados, pero con carencias significativas en la medibilidad. Los KPIs no están claramente definidos ni alineados con los objetivos del proyecto.
Recopilación de Datos Consistente La recopilación de datos es sistemática y coherente, utilizando fuentes fiables y métodos adecuados. La recopilación de datos es generalmente consistente, pero con algunas omisiones o errores menores. La recopilación de datos es aceptable, pero hay inconsistencias o carencias en los métodos utilizados. La recopilación de datos no es consistente ni sistemática, con fuentes y métodos inadecuados.
Validación de la Precisión Las auditorías de datos son periódicas y detalladas, asegurando la precisión con comparaciones adecuadas con estándares. Las auditorías de datos son regulares, pero podrían ser más detalladas o frecuentes. Las auditorías de datos son ocasionales y con detalles insuficientes, afectando la precisión. No se realizan auditorías de datos adecuadas, y la precisión es inadecuada.
Análisis de la Relevancia Los análisis de correlación y el feedback continuo de los stakeholders aseguran que los KPIs son relevantes. Los análisis y el feedback son buenos, pero podrían ser más completos o frecuentes. Los análisis de correlación son superficiales, y el feedback no se recoge de manera consistente. No se realizan análisis de correlación ni se recoge feedback adecuadamente.
Revisión y Ajuste Continuo Los KPIs se revisan y ajustan regularmente para asegurar que continúan siendo relevantes y eficientes. La revisión y ajuste de los KPIs se hace periódicamente, pero con algunas carencias. La revisión de los KPIs es infrecuente y los ajustes son insuficientes. No se realizan revisiones ni ajustes regulares de los KPIs.

Adjunta la explicación aquí. Las explicaciones son capturas de pantalla, de lo que demostraréis en la entrevista de la actividad.

Los KPI que hemos seleccionado para el análisis, y que son frecuentes en game analytics son:

Desempeño Financiero
  • ARPU (Average Revenue Per User): Ingreso promedio generado por cada usuario en un período determinado.
  • LTV (Lifetime Value): Valor total que un usuario genera a lo largo de su ciclo de vida en la app.
Descargas e Instalaciones
  • INSTALLS: Número total de veces que la aplicación ha sido descargada e instalada.
  • CPI (Cost Per Install): Costo que se incurre por cada instalación de la aplicación.
Actividad y Participación
  • DAU (Daily Active Users): Número de usuarios activos en un día específico.
  • Session Length: Tiempo promedio que los usuarios pasan dentro de la aplicación por sesión.
Conversión
  • Conversion Rate: Porcentaje de usuarios que completan una acción específica deseada.
  • Funnel Conversion Rate: Tasa de usuarios que completan cada etapa del embudo de conversión.

Y para su validación hemos llevado a cabo estos pasos:

Definición Clara y Precisa de los KPIs
Grupo KPI Detalles
Desempeño Financiero ARPU (Average Revenue Per User) Objetivo Específico: Medir el ingreso promedio generado por cada usuario en un período determinado.
Medible y Cuantificable: Se calcula dividiendo el ingreso total generado por el número total de usuarios.
LTV (Lifetime Value) Objetivo Específico: Medir el valor total que un usuario genera a lo largo de su ciclo de vida en la app.
Medible y Cuantificable: Se calcula con modelos predictivos basados en ingresos promedio y duración estimada de actividad del usuario.
Descargas e Instalaciones INSTALLS Objetivo Específico: Medir el número total de veces que la aplicación ha sido descargada e instalada.
Medible y Cuantificable: Se mide utilizando datos de las plataformas de distribución (App Store, Google Play, etc.).
CPI (Cost Per Install) Objetivo Específico: Medir el costo que se incurre por cada instalación de la aplicación
Medible y Cuantificable: Se calcula dividiendo el costo total de las campañas de adquisición entre el número total de instalaciones generadas.
Actividad y Participación DAU (Daily Active Users) Objetivo Específico: Medir el número de usuarios activos en un día específico.
Medible y Cuantificable: Se mide directamente a través de eventos de inicio de sesión o sesión (session_start).
Session Length Objetivo Específico: Medir el tiempo promedio que los usuarios pasan dentro de la aplicación por sesión.
Medible y Cuantificable: Se mide calculando la duración promedio entre el inicio y el final de cada sesión registrada.
Conversión Conversion Rate Objetivo Específico: Medir el porcentaje de usuarios que completan una acción específica deseada.
Medible y Cuantificable: Se calcula dividiendo el número de usuarios que completan la acción por el número total de usuarios que iniciaron el proceso.
Funnel Conversion Rate Objetivo Específico: Medir el porcentaje de usuarios que completan una acción específica deseada.
Medible y Cuantificable: Se mide calculando el porcentaje de usuarios que pasan de una etapa a la siguiente dentro del embudo.
Recopilación de Datos Consistente
  • En este caso la recopilación de los datos y la fuente de la que se extraen se trata de un codigo pyhton.Los distintos eventos se han generado a través de un código que los genera de forma aleatoria, teniendo en cuenta una serie de condiciones dadas por el usuario.
Validación de la Precisión
  • Auditoría de Datos: Es importante hacer auditorías periódicas para verificar que los eventos estén correctamente registrados. En nuestro caso, es esencial verificar que no se este perdiendo o duplicando información, ya que nos estaría falsificando la información. Por eso es necesario que verifiquemos constantement que la información que ellega es correcta y que los datos se esten transofrmando correctamente a medida que avanza el procesamiento de datos.

  • Comparación con Estándares: Comparar los KPIs con benchmarks de la industria del juego móvil o con datos históricos del juego para asegurarse de que los valores de los KPIs sean razonables y estén alineados con lo esperado.

Análisis de relevancia
  • Analisis de correlación: | Grupo | KPI | Detalles | |:-------------------------|:----------------------------|:------------------------------------------------------------------------------------------------| | Desempeño Financiero | ARPU (Average Revenue Per User) | Analizar si el ARPU correlaciona con la rentabilidad general de la app y las estrategias de monetización. | | | LTV (Lifetime Value) | Verificar que el LTV esté alineado con los objetivos de crecimiento a largo plazo. | | Descargas e Instalaciones | INSTALLS | Confirmar que el número de instalaciones tenga un impacto directo en la base de usuarios activos. | | | CPI (Cost Per Install) | Validar que un CPI más bajo se traduzca en un crecimiento rentable de la base de usuarios. | | Actividad y Participación | DAU (Daily Active Users) | Correlacionar el DAU con otros KPIs como ingresos o tiempo de sesión. | | | Session Length | Determinar si sesiones más largas conducen a una mayor monetización o retención. | | Conversión | Conversion Rate | Asegurar que una tasa de conversión más alta resulte en mayores ingresos o valor agregado. | | | Funnel Conversion Rate | Analizar cada etapa del embudo para identificar puntos críticos donde se pierdan usuarios. |

  • Feedback Continuo: Hay que ir recopliando el feedback de los steakholders contantement para asegurar que los KPI sean relevantes. En nuestro caso, es relevante los ingresos que generan los usuarios, así como el tiempo que permanecen activos en el juego.

Revisión y Ajuste Continuos
Grupo KPI Detalles
Desempeño Financiero ARPU (Ingresos Promedio por Usuario) Análisis y ajuste del ARPU en función de cambios en ingresos o estructura de usuarios.
LTV (Valor de Vida del Cliente) Actualización de cálculos del LTV basados en cambios en estrategias de retención o monetización.
Descargas e Instalaciones INSTALACIONES Revisión de métricas tras la implementación de campañas de adquisición.
CPI (Costo por Instalación) Ajustes en el CPI según los impactos de las estrategias de adquisición.
Actividad y Participación DAU (Usuarios Activos Diarios) Ajuste de la definición de DAU con la introducción de nuevos eventos significativos.
Duración de la Sesión Modificación de los criterios ante cambios en los patrones de uso detectados.
Conversión Tasa de Conversión Adaptación de la definición de la tasa de conversión según nuevos objetivos.
Tasa de Conversión del Embudo Análisis de las tasas de conversión tras rediseñar el embudo.
Documentación de los Métodos Utilizados
  • Será necesaria la documentación de los calculos aplicados para los distintos KPI, demás de asegurar de que todos los involucrados en el análisis tengan acceso a la documentación denecsaria y comprendan cómo se calculan y validan estas métricas.

Ejercicio 4: Incorporación de scripts o código relevante (2,5 puntos)

Incluid los scripts o código más relevantes que han sido utilizados durante la implementación del proyecto. Aseguraos de que están suficientemente descritos para facilitar la comprensión de las operaciones realizadas.

Durante la implementación del proyecto en el AWS Academy Learner Lab, es importante incorporar y documentar los scripts o código utilizados. Es necesario detallar los siguientes apartados:

1. Selección de los Scripts Relevantes

  • Componentes Clave: Seleccionad scripts esenciales para el funcionamiento del proyecto.
  • Priorización: Priorizad scripts con impacto directo en los objetivos del proyecto.

2. Descripción de los Scripts

  • Objetivo: Explica el propósito de cada script.
  • Funcionamiento: Describe cómo funciona el código, entradas y salidas.
  • Dependencias: Especifica las librerías o servicios de AWS utilizados.

3. Ejemplos de Código

  • Fragmentos: Incluye fragmentos de código con comentarios.
  • Casos de Uso: Proporciona ejemplos prácticos de uso del código.

4. Documentación

  • Guías: Incluye guías detalladas para implementar y ejecutar cada script.
  • Resolución de Problemas: Proporciona soluciones a errores comunes.

5. Revisión y Validación

  • Pruebas: Realiza pruebas para asegurar el correcto funcionamiento de los scripts.
  • Feedback: Recoge feedback para mejorar los scripts y la documentación.

Rúbrica de Evaluación para la Incorporación de scripts o código relevante, cada apartado vale 0,5 puntos

Ítem Excelente (4) Bueno (3) Satisfactorio (2) Necesita Mejora (1)
Selección de los Scripts Relevantes Los scripts seleccionados son esenciales y priorizan el impacto directo en los objetivos del proyecto. Los scripts seleccionados son relevantes pero podrían incluir más componentes clave. La selección de scripts es básica e incluye algunos componentes clave pero omite otros importantes. La selección de scripts es inadecuada y no incluye los componentes esenciales.
Descripción de los Scripts La descripción del propósito, funcionamiento y dependencias es muy clara y detallada. La descripción es clara pero puede faltar algún detalle menor. La descripción es aceptable pero faltan detalles importantes para la comprensión completa. La descripción es confusa o inadecuada, con muchos detalles importantes omitidos.
Ejemplos de Código Los ejemplos de código incluyen fragmentos bien comentados y casos de uso prácticos y relevantes. Los ejemplos de código son buenos pero podrían incluir más comentarios o casos de uso. Los ejemplos de código son aceptables pero con comentarios o casos de uso insuficientes. Los ejemplos de código son inadecuados, con comentarios o casos de uso mínimos o inexistentes.
Documentación Las guías son detalladas y proporcionan soluciones a errores comunes de manera clara y efectiva. Las guías son buenas pero podrían ser más detalladas o incluir más soluciones. Las guías son aceptables pero con información insuficiente sobre la implementación y resolución de problemas. Las guías son inadecuadas y no proporcionan información útil para la implementación.
Revisión y Validación Las pruebas de funcionalidad son completas y el feedback es recogido e implementado para mejorar el código. Las pruebas de funcionalidad son buenas pero podrían ser más completas. Las pruebas de funcionalidad son aceptables pero con deficiencias en algunos aspectos. Las pruebas de funcionalidad son inadecuadas y no se recoge ni implementa feedback.

Adjunta la explicación aquí. Las explicaciones son capturas de pantalla, de lo que demostraréis en la entrevista de la actividad.

A continuación se encuentran los dos scripts más relevantes del proyecto y sus explicaciones.

En primer lugara para el script que genera los datos del proyecto, publish_data.py:

Apartado Descripción
Objetivo Creación de los eventos del juego que se van a analizar en el proyecto y evniarlos a Kinesis para publicar archivos simulando un productor de datos.
Funcionamiento Simula la generación de eventos parecidos a los que se enviarían desde una aplicación móvil y los envía a Kinesis.
Entradas Nombre del Data Stream, Región de AWS, ID de la aplicación.
Salidas Eventos enviados a Kinesis publicados en batches de tamaño batch_size.
Dependencias os, argparse, boto3. Utiliza el servicio Amazon S3.
Fragmento de Código image.png
Ejemplo de Uso Este fragmento genera un evento aleatorio y lo envía a un stream de Kinesis llamado game-analytics-stream.
Guía de Implementación 1. Configura credenciales de AWS.
2. Define bucket y ruta fuente.
3. Ejecuta el script proporcionando argumentos necesarios.
Resolución de Problemas - Error de credenciales: Verifica AWS_ACCESS_KEY y AWS_SECRET_ACCESS_KEY.
- Archivos no encontrados: Confirma rutas locales.
Pruebas - Realiza pruebas con buckets y rutas de prueba.
- Valida los archivos subidos en S3.
Feedback Se recopila retroalimentación para mejorar el funcionamiento y la documentación.

Y en segundo lugar silver_glue_job.py:

Apartado Descripción
Objetivo Procesar datos almacenados en S3 utilizando AWS Glue y transformarlos en tablas aptas para Athena.
Funcionamiento Usa pyspark para leer, procesar y escribir datos en formato Parquet en S3.
Entradas Ruta de datos en S3.
Salidas Datos procesados almacenados en un bucket S3 en formato Parquet.
Dependencias pyspark, boto3, AWS Glue.
Fragmento de Código image-2.png
Ejemplo de Uso Filtrar eventos específicos, seleccionar y transformar columnas, eliminar duplicados y eliminar datos.
Guía de Implementación 1. Configura el entorno Glue en AWS.
2. Asegúrate de que los datos estén disponibles en S3.
3. Crea un trabajo Glue con el script.
Resolución de Problemas - Errores en schema: Ajusta definiciones de DynamicFrame.
- Permisos S3: Verifica políticas del bucket.
Pruebas Procesa un dataset pequeño y valida que la estructura resultante sea correcta.
Feedback Se recopila retroalimentación para mejorar el funcionamiento y la documentación.

Documentación del script de generación y envío de datos a Amazon Kinesis

Descripción

Este script genera y envía datos aleatorios o predefinidos a un flujo de Kinesis en AWS. Es útil para realizar pruebas de ingestión de datos en tiempo real y para desarrollar aplicaciones de análisis de datos basadas en Kinesis.


Requisitos

Dependencias

El script requiere las siguientes bibliotecas de Python:

  • boto3: Cliente de AWS SDK para Python.
  • json: Para la manipulación de datos JSON.
  • random: Para generar datos aleatorios.
  • time: Para trabajar con marcas de tiempo.
  • uuid: Para generar identificadores únicos.
  • argparse: Para manejar argumentos de línea de comandos.
  • numpy: Para operaciones avanzadas con datos.
  • access_keys: Archivo que contiene las claves de acceso AWS (ACCESS_KEY y SECRET_KEY).

Instale las dependencias usando:

pip install boto3 numpy

Uso del Script

Ejecución Básica

El script se ejecuta desde la línea de comandos y acepta varios argumentos configurables. Por defecto, genera eventos aleatorios y los envía indefinidamente al flujo de Kinesis.

Comando de Ejecución

python publish_data.py --region <REGIÓN_AWS> --stream-name <NOMBRE_FLUJO> --application-id <ID_APLICACIÓN>
Parámetro Requerido Descripción
--region Región AWS donde se encuentra el flujo de Kinesis.
--stream-name Nombre del flujo de Kinesis al que se enviarán los datos.
--application-id Identificador de la aplicación que envía los datos.
--batch-size No Tamaño del lote de eventos enviados en cada solicitud. Valor por defecto: 5.
--input-filename No Archivo de entrada con eventos JSON predefinidos, uno por línea. Si se usa, el script termina al procesarlo.

Estructura del Código

El script se organiza en las siguientes secciones principales:

  1. Importación de Módulos
    Importa bibliotecas necesarias como boto3, json, uuid, time, random, y otras para la generación y transmisión de datos.

  2. Constantes y Valores Predeterminados
    Define valores predeterminados para los eventos y parámetros, como DEFAULT_EVENT_VERSION y DEFAULT_BATCH_SIZE.

  3. Funciones

    • parse_cmd_line: Analiza los argumentos de la línea de comandos.
    • getUUIDs: Genera una lista de identificadores UUID.
    • getEventType: Selecciona un tipo de evento aleatorio basado en probabilidades preconfiguradas.
    • getEvent: Crea un evento aleatorio utilizando datos predefinidos.
    • generate_event: Combina datos generales y específicos para crear un evento completo.
    • send_record_batch: Envía un lote de eventos al flujo de Kinesis.
    • send_events_infinite: Envía eventos en un bucle infinito, generándolos o leyendo desde un archivo.
  4. Configuración Principal y Ejecución

    • Configura los parámetros de AWS y Kinesis según los argumentos de la línea de comandos.
    • Crea un cliente de Kinesis utilizando boto3.
    • Llama a send_events_infinite para iniciar el envío de eventos al flujo.
  5. Ejecución Condicional
    Utiliza if __name__ == '__main__': para garantizar que el script solo se ejecute directamente y no como un módulo importado.

In [ ]:
# Publish Data Generador de datos provisto por AWS .

import boto3 # type: ignore
import json
import random
from random import choice
import time
from datetime import datetime
import uuid
import os
import numpy
import argparse
from access_keys import ACCESS_KEY, SECRET_KEY

# Event Payload defaults
DEFAULT_EVENT_VERSION = '1.0.0'
DEFAULT_BATCH_SIZE = 5



def parse_cmd_line():
    """Parse the command line and extract the necessary values."""

    parser = argparse.ArgumentParser(description='Send data to a Kinesis stream for analytics. By default, the script '
                                                 'will send events infinitely. If an input file is specified, the '
                                                 'script will instead read and transmit all of the events contained '
                                                 'in the file and then terminate.')

    # REQUIRED arguments
    kinesis_regions = boto3.session.Session().get_available_regions('kinesis')
    parser.add_argument('--region', required=True, choices=kinesis_regions, type=str,
                        dest='region_name', metavar='kinesis_aws_region',
                        help='The AWS region where the Kinesis stream is located.')
    parser.add_argument('--stream-name', required=True, type=str, dest='stream_name',
                        help='The name of the Kinesis stream to publish to. Must exist in the specified region.')
    parser.add_argument('--application-id', required=True, type=str, dest='application_id',
                        help='The application_id to use when submitting events to ths stream (i.e. You can use the default application for testing).')
    # OPTIONAL arguments
    parser.add_argument('--batch-size', type=int, dest='batch_size', default=DEFAULT_BATCH_SIZE,
                        help='The number of events to send at once using the Kinesis PutRecords API.')
    parser.add_argument('--input-filename', type=str, dest='input_filename',
                        help='Send events from a file rather than randomly generate them. The format of the file'
                             ' should be one JSON-formatted event per line.')

    return parser.parse_args()

# Returns array of UUIDS. Used for generating sets of random event data
def getUUIDs(dataType, count):
    uuids = []
    for i in range(0, count):
        uuids.append(str(uuid.uuid4()))
    return uuids    

# Randomly choose an event type from preconfigured options
def getEventType():
  event_types = {
        1: 'user_registration',
        2: 'user_knockout',
        3: 'item_viewed',
        4: 'iap_transaction',
        5: 'login',
        6: 'logout',
        7: 'tutorial_progression',
        8: 'user_rank_up',
        9: 'matchmaking_start',
        10: 'matchmaking_complete',
        11: 'matchmaking_failed',
        12: 'match_start',
        13: 'match_end',
        14: 'level_started',
        15: 'level_completed',
        16: 'level_failed',
        17: 'lootbox_opened',
        18: 'user_report',
        19: 'user_sentiment'
  }
  return event_types[numpy.random.choice([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19], 1, p=[0.04, 0.05, 0.18, 0.02, 0.1, 0.06, 0.04, 0.03, 0.025, 0.025, 0.01, 0.03, 0.03, 0.08, 0.08, 0.08, 0.04, 0.04, 0.04])[0]]
  
# Generate a randomized event from preconfigured sample data
def getEvent(event_type, SERVERS, MATCHES):
    
    levels = [
        '1',
        '2',
        '3',
        '4',
        '5'
    ]
    
    countries = [
        
        'UNITED STATES',
        'UK',
        'JAPAN',
        'SINGAPORE',
        'AUSTRALIA',
        'BRAZIL',
        'SOUTH KOREA',
        'GERMANY',
        'CANADA',
        'FRANCE'
    ]

    items = getUUIDs('items', 10)
    
    currencies = [
        'USD',
        'EUR',
        'YEN',
        'RMB'
    ]
    
    platforms = [
        'nintendo_switch',
        'ps4',
        'xbox_360',
        'iOS',
        'android',
        'pc',
        'fb_messenger'
    ]

    tutorial_screens = [
        '1_INTRO',
        '2_MOVEMENT',
        '3_WEAPONS',
        '4_FINISH'
    ]

    match_types = [
        '1v1',
        'TEAM_DM_5v5',
        'CTF'
    ]

    matching_failed_msg = [
        'timeout',
        'user_quit',
        'too_few_users'
    ]

    maps = [
        'WAREHOUSE',
        'CASTLE',
        'AIRPORT'
    ]

    game_results = [
        'WIN',
        'LOSE',
        'KICKED',
        'DISCONNECTED',
        'QUIT'
    ]

    spells = [
        'WATER',
        'EARTH',
        'FIRE',
        'AIR'
    ]
    
    ranks = [
        '1_BRONZE',
        '2_SILVER',
        '3_GOLD',
        '4_PLATINUM',
        '5_DIAMOND',
        '6_MASTER'
    ]
    
    item_rarities = [
        'COMMON',
        'UNCOMMON',
        'RARE',
        'LEGENDARY'
        
    ]
    
    report_reasons = [
        'GRIEFING',
        'CHEATING',
        'AFK',
        'RACISM/HARASSMENT'
        
    ]
    
    switcher = {
        'login': {
            'event_data': {
                'platform': str(numpy.random.choice(platforms, 1, p=[0.2, 0.1, 0.3, 0.15, 0.1, 0.05, 0.1])[0]),
                'last_login_time': int(time.time())-random.randint(40000,4000000)
            }
        },
        
        'logout': {
            'event_data': {
                'last_screen_seen': 'the last screen'
            }
        },
        
        'client_latency': {
            'event_data': {
                'latency': numpy.random.choice((random.randint(40,185),1)),
                'connected_server_id': str(random.choice(SERVERS)),
                'region': str(random.choice(countries))   
            }
        },
        
        'user_registration': {
            'event_data': {
                'country_id': str(numpy.random.choice(countries, 1, p=[0.3, 0.1, 0.2, 0.05, 0.05, 0.02, 0.15, 0.05, 0.03, 0.05])[0]),
                'platform': str(numpy.random.choice(platforms, 1, p=[0.2, 0.1, 0.3, 0.15, 0.1, 0.05, 0.1])[0])
            }
        },
        
        'user_knockout': {
            'event_data': {
                'match_id': str(random.choice(MATCHES)),
                'map_id': str(numpy.random.choice(maps, 1, p=[0.6, 0.3, 0.1])[0]),
                'spell_id': str(numpy.random.choice(spells, 1, p=[0.1, 0.4, 0.3, 0.2])[0]),
                'exp_gained': random.randint(1,2)
            }
        },
        
        'item_viewed': {
             'event_data': {
                'item_id': str(numpy.random.choice(items, 1, p=[0.125, 0.11, 0.35, 0.125, 0.04, 0.01, 0.07, 0.1, 0.05, 0.02])[0]),
                'item_version': random.randint(1,2)
            }
        },

        'iap_transaction': {
            'event_data': {
                'item_id': str(numpy.random.choice(items, 1, p=[0.125, 0.11, 0.35, 0.125, 0.04, 0.01, 0.07, 0.1, 0.05, 0.02])[0]),
                'item_version': random.randint(1,2),
                'item_amount': random.randint(1,4),
                'currency_type': str(numpy.random.choice(currencies, 1, p=[0.7, 0.15, 0.1, 0.05])[0]),
                'country_id': str(numpy.random.choice(countries, 1, p=[0.3, 0.1, 0.2, 0.05, 0.05, 0.02, 0.15, 0.05, 0.03, 0.05])[0]),
                'currency_amount': random.randint(1,10),
                'transaction_id': str(uuid.uuid4())
            }
        },
    
        'tutorial_progression': {
            'event_data': {
                'tutorial_screen_id': str(numpy.random.choice(tutorial_screens, 1, p=[0.3, 0.3, 0.2, 0.2])[0]),
                'tutorial_screen_version': random.randint(1,2)
            }
        },

        'user_rank_up': {
            'event_data': {
                'user_rank_reached': str(numpy.random.choice(ranks, 1, p=[0.25, 0.35, 0.2, 0.15, 0.0499, 0.0001])[0])
            }
        },
        
        'matchmaking_start': {
            'event_data': {
                'match_id': str(random.choice(MATCHES)),
                'match_type': str(numpy.random.choice(match_types, 1, p=[0.4, 0.3, 0.3])[0])
            }
        },

        'matchmaking_complete': {
            'event_data': {
                'match_id': str(random.choice(MATCHES)),
                'match_type': str(numpy.random.choice(match_types, 1, p=[0.6, 0.2, 0.2])[0]),
                'matched_slots': random.randrange(start=6, stop=10)
            }
        },

        'matchmaking_failed': {
            'event_data': {
                'match_id': str(random.choice(MATCHES)),
                'match_type': str(numpy.random.choice(match_types, 1, p=[0.35, 0.2, 0.45])[0]),
                'matched_slots': random.randrange(start=1, stop=10),
                'matching_failed_msg': str(numpy.random.choice(matching_failed_msg, 1, p=[0.35, 0.2, 0.45])[0])
            }
        },

        'match_start': {
            'event_data': {
                'match_id': str(random.choice(MATCHES)),
                'map_id': str(numpy.random.choice(maps, 1, p=[0.3, 0.3, 0.4])[0])
            }
        },

        'match_end': {
            'event_data': {
                'match_id': str(random.choice(MATCHES)),
                'map_id': str(numpy.random.choice(maps, 1, p=[0.3, 0.3, 0.4])[0]),
                'match_result_type': str(numpy.random.choice(game_results, 1, p=[0.4, 0.4, 0.05, 0.05, 0.1])[0]),
                'exp_gained': random.randrange(start=100, stop=200),
                'most_used_spell': str(numpy.random.choice(spells, 1, p=[0.1, 0.4, 0.2, 0.3])[0])
            }
        },
        
        'level_started': {
            'event_data': {
                'level_id': str(numpy.random.choice(levels, 1, p=[0.2, 0.2, 0.2, 0.2, 0.2])[0]),
                'level_version': random.randint(1,2)
            }
        },
        'level_completed': {
            'event_data': {
                'level_id': str(numpy.random.choice(levels, 1, p=[0.6, 0.2, 0.12, 0.05, 0.03])[0]),
                'level_version': random.randint(1,2)
            }
        },
        'level_failed': {
            'event_data': {
                'level_id': str(numpy.random.choice(levels, 1, p=[0.001, 0.049, 0.05, 0.3, 0.6])[0]),
                'level_version': random.randint(1,2)
            }
        },
        
        'lootbox_opened': {
            'event_data': {
                'lootbox_id': str(uuid.uuid4()),
                'lootbox_cost': random.randint(2,5),
                'item_rarity': str(numpy.random.choice(item_rarities, 1, p=[0.5, 0.3, 0.17, .03])[0]),
                'item_id': str(numpy.random.choice(items, 1, p=[0.125, 0.11, 0.35, 0.125, 0.04, 0.01, 0.07, 0.1, 0.05, 0.02])[0]),
                'item_version': random.randint(1,2),
                'item_cost': random.randint(1,5)
            }
        },

        'user_report': {
            'event_data': {
                'report_id': str(uuid.uuid4()),
                'report_reason': str(numpy.random.choice(report_reasons, 1, p=[0.2, 0.5, 0.1, 0.2])[0])
            }
        },
        
        'user_sentiment': {
            'event_data': {
                'user_rating': random.randint(1,5)
            }
        }
    }
    
    return switcher[event_type]
    

# Take an event type, get event data for it and then merge that event-specific data with the default event fields to create a complete event
def generate_event():
    SERVERS = getUUIDs('str', 300)
    MATCHES = getUUIDs('str', 300)
    event_type = getEventType()
    # Within the demo script the event_name is set same as event_type for simplicity.
    # In many use cases multiple events could exist under a common event type which can enable you to build a richer data taxonomy.
    event_name = event_type
    event_data = getEvent(event_type, MATCHES=MATCHES, SERVERS=SERVERS)
    event = {
        'event_version': DEFAULT_EVENT_VERSION,
        'event_id': str(uuid.uuid4()),
        'event_type': event_type,
        'event_name': event_name,
        'event_timestamp': int(time.time()),
        'app_version': str(numpy.random.choice(['1.0.0', '1.1.0', '1.2.0'], 1, p=[0.05, 0.80, 0.15])[0])
    }
    
    event.update(event_data)
    return event
    
def send_record_batch(kinesis_client, stream_name, raw_records):
    """Send a batch of records to Amazon Kinesis."""

    # Translate input records into the format needed by the boto3 SDK
    formatted_records = []
    for rec in raw_records:
        formatted_records.append({'PartitionKey': rec['event']['event_name'], 'Data': json.dumps(rec)})
        kinesis_client.put_records(StreamName=stream_name, Records=formatted_records)
    print('Sent %d records to stream %s.' % (len(formatted_records), stream_name))

def send_events_infinite(kinesis_client, stream_name, batch_size, application_id):
    """Send a batches of randomly generated events to Amazon Kinesis."""
    
    while True:
        records = []
        # Create a batch of random events to send
        for i in range(0, batch_size):
            event_dict = generate_event()
            record = {
                'event': event_dict,
                'application_id': application_id
            }
            records.append(record)
        send_record_batch(kinesis_client, stream_name, records)
        time.sleep(random.randint(1,7))

if __name__ == '__main__':
    args = parse_cmd_line()
    aws_region = args.region_name
    kinesis_stream = args.stream_name
    batch_size = args.batch_size or DEFAULT_BATCH_SIZE
    application_id = args.application_id
    
    print('===========================================')
    print('CONFIGURATION PARAMETERS:')
    print('- KINESIS_STREAM: ' + kinesis_stream)
    print('- AWS_REGION: ' + aws_region)
    print('- APPLICATION_ID: ' + application_id)
    SERVERS = getUUIDs('servers', 3)
    MATCHES = getUUIDs('matches', 50)
    print('===========================================\n')
    
    session = boto3.Session()
    client = session.client('kinesis', aws_access_key_id=ACCESS_KEY, aws_secret_access_key=SECRET_KEY ,region_name=aws_region)
    
    send_events_infinite(client, kinesis_stream, batch_size, application_id)

Documentación: Procesamiento de la capa Silver

Este script realiza el procesamiento de datos desde la capa Raw hacia la capa Silver en un entorno de AWS Glue, transformando y estructurando la información para facilitar análisis posteriores. A continuación, se describe su funcionamiento en detalle.


Propósito

El propósito de este script es filtrar, transformar y almacenar datos en la capa Silver. Se enfoca en cuatro tipos principales de eventos:

  • lootbox_opened
  • level_started, level_completed, level_failed
  • iap_transaction
  • match_start, match_end

Los datos resultantes se guardan en formato Parquet en Amazon S3, organizados por particiones de tiempo (año, mes, día).


Funcionamiento

1. Importación de Módulos

El script utiliza bibliotecas clave para el procesamiento:

  • AWS Glue: Para interactuar con el catálogo y realizar transformaciones en los datos.
  • PySpark: Para manipular y procesar datos en forma de DataFrames.
  • Funciones adicionales: Como from_unixtime para convertir marcas de tiempo de Unix a formatos legibles.

2. Configuración Inicial

  • Se crea un contexto de Spark y Glue para habilitar el procesamiento distribuido.
  • Se inicializa un objeto Job para rastrear y gestionar el trabajo en Glue.

3. Lectura de Datos

  • Los datos se leen desde un catálogo de AWS Glue (base de datos actividad_3 y tabla raw_events_actividad_3).
  • Los datos se cargan en un DynamicFrame, que luego se convierte en un DataFrame para realizar transformaciones avanzadas.

4. Procesamiento de Datos por Evento

Se procesan los datos según el tipo de evento:

LootBox (lootbox_opened)

  • Filtra eventos del tipo lootbox_opened.
  • Selecciona columnas específicas, como el ID de la LootBox y los costos asociados.
  • Convierte las marcas de tiempo en un formato legible.
  • Ordena los datos por la fecha del evento (event_datetime) y elimina duplicados.

Niveles (level_started, level_completed, level_failed)

  • Filtra eventos relacionados con niveles.
  • Extrae información como el ID del nivel y la versión asociada.
  • Transforma y ordena los datos de forma similar.

Transacciones de Compra (iap_transaction)

  • Filtra eventos relacionados con transacciones dentro de la aplicación.
  • Obtiene información sobre los artículos comprados, costos y tipos de moneda.

Partidas (match_start, match_end)

  • Filtra datos de inicio y fin de partidas.
  • Extrae columnas relevantes como el resultado de la partida y el hechizo más usado.

5. Guardado en la Capa Silver

  • Los DataFrames procesados se almacenan en Amazon S3, organizados por tipo de evento.
  • Configura particiones de tiempo (año, mes, día) para optimizar las consultas posteriores.
  • Se actualiza automáticamente la información del catálogo de Glue con el esquema y las particiones correspondientes.

6. Commit del Trabajo

  • Finaliza el trabajo en Glue mediante job.commit().

Salidas del Proceso

El script genera los siguientes conjuntos de datos procesados en la capa Silver:

Nombre de Salida Tipo de Evento Ubicación en S3
lootbox_open Eventos de apertura de LootBoxes s3://capa-silver-gameanalytics/lootbox_open/
levels Eventos de niveles (iniciados, terminados) s3://capa-silver-gameanalytics/levels/
match Eventos de inicio y fin de partidas s3://capa-silver-gameanalytics/match/
iaps Transacciones dentro de la aplicación s3://capa-silver-gameanalytics/iaps/

Consideraciones

  1. Particiones: Los datos están organizados por year, month y day para mejorar el rendimiento de las consultas.
  2. Formato de Salida: Los datos se almacenan en formato Parquet con compresión Snappy.
  3. Catálogo de Glue: Cada conjunto de datos actualizado incluye la información en el catálogo de Glue para facilitar el acceso mediante AWS Athena o Spark.

Mejoras Futuras

  • Incorporar validaciones de calidad de datos antes de guardar en la capa Silver.
  • Agregar más tipos de eventos según las necesidades de análisis.
  • Implementar monitoreo del pipeline para detectar errores o inconsistencias.

Estructura del Código: Procesamiento de la capa Silver

El script se organiza en las siguientes secciones principales:

  1. Importación de Módulos
    Importa las bibliotecas necesarias para el procesamiento de datos en AWS Glue y Apache Spark, como awsglue, pyspark.sql.functions y otras.

  2. Inicialización del Contexto de Glue
    Configura el contexto de Glue y Spark, incluyendo la creación de un objeto Job para la gestión del trabajo.

  3. Lectura de Datos desde la Capa Raw

    • Carga los datos desde un catálogo de Glue utilizando create_dynamic_frame.from_catalog.
    • Convierte el DynamicFrame en un DataFrame de Spark para mayor flexibilidad en el procesamiento.
  4. Procesamiento por Tipo de Evento

    • Filtra y transforma los datos según diferentes tipos de eventos (lootbox_opened, level_started, iap_transaction, match_start, match_end).
    • Realiza operaciones como selección de columnas, conversión de timestamps y eliminación de duplicados.
    • Ordena los datos procesados por la columna event_datetime.
  5. Guardado de los Datos Procesados en la Capa Silver

    • Crea un diccionario data_to_save que organiza los DataFrames procesados por tipo de evento.
    • Itera sobre el diccionario, convirtiendo cada DataFrame en un DynamicFrame para guardarlo en S3.
    • Configura las particiones por año, mes y día, y utiliza formato Parquet comprimido con Snappy.
    • Actualiza la información del catálogo de Glue para cada tabla procesada.
  6. Commit del Trabajo
    Finaliza el trabajo con job.commit() para asegurar que se registre correctamente en AWS Glue.

In [ ]:
## Procesamiento de la capa Silver

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import *
from awsglue.dynamicframe import DynamicFrame

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

dyf = glueContext.create_dynamic_frame.from_catalog(database='actividad_3', table_name='raw_events_actividad_3')
dyf.printSchema()

df = dyf.toDF()

# Ejemplo de LootBox ID
loot_box_df = (
    df.where("event.event_type == 'lootbox_opened'")
      .select(
          "event.event_type",
          # Convert Unix epoch timestamp to readable datetime
          from_unixtime((col("event.event_timestamp")).cast("long")).alias("event_datetime"),
          "event.event_data.lootbox_id",
          "event.event_data.lootbox_cost",
          "event.event_data.item_cost",
          col("partition_0").alias("year"),
          col("partition_1").alias("month"),
          col("partition_2").alias("day")
      )
      .dropDuplicates()
      .orderBy(col("event_datetime"))
)


level_df = (
    df.where("event.event_type in ('level_started', 'level_completed', 'level_failed')")
      .select(
          "event.event_type",
          # Convert Unix epoch timestamp to readable datetime
          from_unixtime((col("event.event_timestamp")).cast("long")).alias("event_datetime"),
          "event.event_data.level_id",
          "event.event_data.level_version",
          col("partition_0").alias("year"),
          col("partition_1").alias("month"),
          col("partition_2").alias("day")
      )
      .dropDuplicates()
      .orderBy(col("event_datetime"))
)


iap_df = (
    df.where("event.event_type in ('iap_transaction')")
      .select(
          "event.event_type",
          # Convert Unix epoch timestamp to readable datetime
          from_unixtime((col("event.event_timestamp")).cast("long")).alias("event_datetime"),
          "event.event_data.item_id",
          "event.event_data.item_version",
          "event.event_data.item_amount",
          "event.event_data.currency_type",
          "event.event_data.country_id",
          "event.event_data.currency_amount",
          "event.event_data.transaction_id",
          col("partition_0").alias("year"),
          col("partition_1").alias("month"),
          col("partition_2").alias("day")
      )
      .dropDuplicates()
      .orderBy(col("event_datetime"))
)


match_df = (
    df.where("event.event_type in ('match_start', 'match_end')")
      .select(
          "event.event_type",
          # Convert Unix epoch timestamp to readable datetime
          from_unixtime((col("event.event_timestamp")).cast("long")).alias("event_datetime"),
          "event.event_data.match_id",
          "event.event_data.map_id",
          "event.event_data.match_result_type",
          "event.event_data.exp_gained",
          "event.event_data.most_used_spell",
          col("partition_0").alias("year"),
          col("partition_1").alias("month"),
          col("partition_2").alias("day")
      )
      .dropDuplicates()
      .orderBy(col("event_datetime"))
)
# Save Data 

data_to_save = {
    'lootbox_open': loot_box_df,
    'levels': level_df,
    'match': match_df,
    'iaps': iap_df
}

for name, df in data_to_save.items():

    dyf = DynamicFrame.fromDF(df, glueContext, name)

    s3output = glueContext.getSink(
        path=f"s3://capa-silver-gameanalytics/{name}/",
        connection_type="s3",
        updateBehavior="UPDATE_IN_DATABASE",
        partitionKeys=["year", "month", "day"],
        compression="snappy",
        enableUpdateCatalog=True,
        transformation_ctx="s3output",
    )

    # Update Glue Catalog Table Information
    s3output.setCatalogInfo(
        catalogDatabase="silver", 
        catalogTableName=name
    )

    # Set Output Format
    s3output.setFormat("glueparquet")

    # Write DynamicFrame to S3

    s3output.writeFrame(dyf)
job.commit()